RabbitMQ是一個訊息佇列的服務,透過將訊息(DATA)非同步傳輸與建立多個佇列功能,以此功能實現資料的緩衝、訊息的分發以及解偶。
在系統規模大的情況下,適度使用這類服務能緩解系統的壓力。同類的產品還有:ActiveMQ、Kafka等等。
在此我們要提到Queue使用的通訊協定──AMQP,AMQP的主要架構如下:
基本上RabbitMQ的傳輸如下:
AMQP也未必都是好處,我們當然可以享受在AMQP下解偶的好處,但同時也帶來了一個問題,就是發送方因為未與接收方直接連接,導致其實他無法確認接收方是否真的有收到訊息、Producer只能有確保資料有成功發送到交換器。
而對消費者也是一樣的,Consumer沒有辦法確認Producer是否有發出請求,只能確認沒有從Exchange收到請求而已。
可以參考程式碼:
// 這是接收方的定義
public void sendOrderToQueue(Order order) {
    // 建立 RabbitMQ 連線和 Channel
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    
    String exchangeName = "order-exchange";
    String routingKey = "order.created";
    // 將訂單數據轉換為字串(可以是 JSON 格式)
    String message = convertOrderToJson(order);
    
    // 發送消息到交換器
    channel.basicPublish(exchangeName, routingKey, null, message.getBytes(StandardCharsets.UTF_8));
}
// 這是接收方的定義
@Configuration
public class RabbitMQConfig {
    public static final String EXCHANGE_NAME = "order-exchange";
    public static final String QUEUE_NAME = "order-queue";
    public static final String ROUTING_KEY = "order.created";
    // 定義 Exchange
    @Bean
    public DirectExchange orderExchange() {
        return new DirectExchange(EXCHANGE_NAME);
    }
    // 定義 Queue
    @Bean
    public Queue orderQueue() {
        return new Queue(QUEUE_NAME, true); // durable = true
    }
    // 定義 Binding (將 Queue 綁定到 Exchange 上,並使用 Routing Key)
    @Bean
    public Binding binding(Queue orderQueue, DirectExchange orderExchange) {
        return BindingBuilder.bind(orderQueue).to(orderExchange).with(ROUTING_KEY);
    }
}
// 這是接收到訊息的地方
// 監聽來自 "order-queue" 的消息
@RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)
public void receiveOrderMessage(String message) {
    // 假設消息是 JSON 格式,這裡可以將它轉回到 Order 對象
    Order order = convertJsonToOrder(message);
    // 處理訂單邏輯
    System.out.println("接收到的訂單: " + order);
}
另外,在Exchange部分,主要有四種發送訊息的方法:
那今天的部分就先到這裡了,我們明天見。